package com.we.flink.utils;

import org.apache.flink.connector.kafka.sink.TopicSelector;

import com.alibaba.fastjson.JSONObject;

public class WeKafkaTopicSelector implements TopicSelector<String> {
    public boolean same_topic = false;
    public String topic_name;
    public boolean add_prefix = false;
    public String topic_prefix;

    public WeKafkaTopicSelector(
            boolean same_topic, String topic_name, boolean add_prefix, String topic_prefix) {
        this.same_topic = same_topic;
        this.topic_name = topic_name;
        this.add_prefix = add_prefix;
        this.topic_prefix = topic_prefix;
    }

    @Override
    public String apply(String value) {

        if (same_topic) {
            if (add_prefix) {
                return topic_prefix + topic_name;
            } else {
                return topic_name;
            }
        } else {
            JSONObject input = JSONObject.parseObject(value);
            String table_name = input.getString("table_name");
            if (add_prefix) {
                return topic_prefix + "_" + table_name;
            } else {
                return table_name;
            }
        }
    }
}
